Code to be executed on the cluster can be installed beforehand, or can be provided dynamically in two ways:
A library of Python code can be imported into the client (i.e. here) and added to the cluster.
You would use this for a Python library that isn't (yet) formally installed on the slave nodes.
In [1]:
# Import library
import pe
# Add this same library to the cluster via the Spark context
sc.addPyFile('./pe.py')
help(pe)
This is a function that can be applied to each item in a RDD.
We define it here in the notebook. It can be serialised and sent out to be executed on the cluster slave nodes.
In [2]:
import h5py
def pe_shard(m, t, raw_file):
h5f = h5py.File(raw_file.local_path()) # Open from locally cached file
results = dict()
for ds in h5f:
inj = h5f[ds].attrs['INJ']
fb = h5f[ds].attrs['FB']
TS = h5f[ds][()]
pe_result = pe.pe_single(TS, m, t) # pe.pe_single(): added to cluster
results[(inj, fb)] = pe_result
h5f.close()
return results
We acquire a dataset from a repository.
In [3]:
from bdkd import datastore as ds
repo = ds.repositories().get('bdkd-laser-public')
dataset = repo.get('datasets/Sample dataset')
len(dataset.files)
Out[3]:
In this case we want to work on a subset of the files in the dataset. We can pick them by name: it so happens that the files we want have a name containing "FB_" so we can filter on that.
In [4]:
raw_files = dataset.files_matching('FB_')
len(raw_files)
Out[4]:
We can parallelise the list of files, creating what Spark calls a "resilient distributed dataset" (RDD). This is basically a dataset that can be spread around the cluster and acted upon.
In this case we will breaking it up into a number of pieces equal to the number of files (i.e. numSlices). This will cause each file to be processed individually. Slicing is a strategic decision: how you slice up your data may affect the efficiency of processing.
In [5]:
raw_rdd = sc.parallelize(raw_files, numSlices=len(raw_files))
This is where computation on the cluster happens.
Notes:
In [24]:
m = 6 # order of permutation entropy
t = 5 # delay
### CLUSTER ###
pes = raw_rdd\
.map(lambda raw_file: pe_shard(m, t, raw_file))\
.reduce(lambda x, y: dict(x, **y))
### CLUSTER ###
Check that the size of the results is equal to the number of timeseries.
In [19]:
len(pes)
Out[19]:
The results we just generated are a mapping of (injection,feedback) --> entropy. We can turn this into a 2D array of injection x feedback.
In [25]:
(max_inj, max_fb) = sorted(pes.keys())[-1]
pe_array = []
for inj in range(max_inj + 1):
row = []
for fb in range (max_fb + 1):
row.append(pes[(inj, fb)])
pe_array.append(row)
We can plot the permutation entropy calculations as a heat map.
First, we need the feedback and injection range maps provided with the dataset. These are stored in a HDF5 file called "maps.hdf5", which we can acquire by name and open.
In [22]:
import h5py
maps = h5py.File(dataset.file_ending('maps.hdf5').local_path())
FBT = maps['FBT_map.csv'][()]
INJ = maps['INJ_map.csv'][()]
### Contents of file:
# maps.items()
We create a heatmap plot of injection versus feedback versus permutation entropy (calculated above), with a color bar on the side.
The iPython magic "%matplotlib inline" allows us to display plots within this page.
In [28]:
%matplotlib inline
import numpy as np
mapX = np.array(INJ)
mapY = np.array(FBT)
mapZ = np.array(pe_array) # calculated above
fig = plt.figure()
plt.pcolor(mapX, mapY, mapZ)
plt.axes().set_xlim(np.min(mapX), np.max(mapX))
plt.axes().set_ylim(np.min(mapY), np.max(mapY))
plt.colorbar()
Out[28]:
In [ ]: